<?php
namespace Swork\Client;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use Swork\Exception\AmqpException;
use Swork\Pool\Amqp\AmqpConnection;
use Swork\Pool\PoolCollector;
use Swork\Pool\PoolInterface;
use Swork\Service;

class Amqp
{
    /**
     * @var PoolInterface
     */
    private $pool;

    /**
     * 队列名称
     * @var string
     */
    private $queueName;

    /**
     * 是否已经声明
     * @var bool
     */
    private $declare = false;

    /**
     * 初始化获取连接池
     * Redis constructor
     */
    public function __construct()
    {
        $this->pool = PoolCollector::getCollector(PoolCollector::Amqp, 'default');
    }

    /**
     * 设置参数（公共入口）
     * @param mixed $value 参数化入口（可能是string、array）
     */
    public function setParams($value)
    {
        if (is_string($value))
        {
            $this->setQueueName($value);
        }
        else if (is_array($value) && isset($value['name']))
        {
            $this->setQueueName($value['name']);
        }
    }

    /**
     * 设置队列名
     * @param string $queueName
     */
    public function setQueueName(string $queueName)
    {
        $this->queueName = $queueName;
    }

    /**
     * 获取amqp连接
     * @return AMQPStreamConnection|false
     */
    public function getConnection()
    {
        $conn = $this->pool->getConnection();
        if (!($conn instanceof AmqpConnection))
        {
            return false;
        }
        return $conn->getConnection();
    }

    /**
     * 获取管道
     * @param null $channel_id
     * @return AMQPChannel | false
     */
    public function getChannel($channel_id = null)
    {
        $connection = $this->getConnection();
        if ($connection == false)
        {
            return false;
        }
        return $connection->channel($channel_id);
    }

    /**
     * 基础方式投递队列消息
     * @param string|array $body 消息内容
     * @param array $args 额外参数
     * @throws
     */
    public function basePublish($body, array $args = [])
    {
        //初始化
        if (empty($this->queueName))
        {
            throw new AmqpException('no queue name defined');
        }

        //获取通道
        $channel = $this->getChannel();
        if ($channel == false)
        {
            throw new AmqpException('no channel fetched');
        }

        //声明队列
        if ($this->declare == false)
        {
            $channel->queue_declare($this->queueName, false, true, false, false, false);
            $this->declare = true;
        }

        //格式化消息
        if (is_array($body))
        {
            $body = json_encode($body, JSON_UNESCAPED_UNICODE);
        }
        $message = new AMQPMessage($body, $args);

        //投递消息
        try
        {
            $channel->basic_publish($message, '', $this->queueName);
        }
        catch (\Throwable $ex)
        {
            $code = $ex->getCode();
            if ($code == 104)
            {
                Service::$logger->info('AMQP try basePublish again');
                $this->basePublish($body, $args);
                return;
            }
            throw new AmqpException($ex->getMessage(), $code);
        }
    }
}
